Documentation Index Fetch the complete documentation index at: https://mintlify.com/FrankDevg/imbd_scrapper_project/llms.txt
Use this file to discover all available pages before exploring further.
The IMDb Scraper implements a hybrid persistence strategy that simultaneously saves data to PostgreSQL and CSV files, using the Repository pattern for clean separation of concerns.
Hybrid Persistence Architecture
The system uses a composite use case to orchestrate dual persistence:
class CompositeSaveMovieWithActorsUseCase ( UseCaseInterface ):
def __init__ ( self , use_cases : List[UseCaseInterface]):
self .use_cases = use_cases
self .max_workers = len (use_cases)
def execute ( self , movie : Movie) -> None :
with ThreadPoolExecutor( max_workers = self .max_workers) as executor:
# Execute all persistence strategies in parallel
list (executor.map( lambda uc : uc.execute(movie), self .use_cases))
Location: application/use_cases/composite_save_movie_with_actors_use_case.py:25
PostgreSQL Persistence
Schema Design
The database implements a normalized relational schema:
-- Movies table
CREATE TABLE movies (
id SERIAL PRIMARY KEY ,
imdb_id VARCHAR ( 20 ) UNIQUE NOT NULL ,
title VARCHAR ( 255 ) NOT NULL ,
year INTEGER ,
rating NUMERIC ( 3 , 1 ),
duration_minutes INTEGER ,
metascore INTEGER
);
-- Actors table
CREATE TABLE actors (
id SERIAL PRIMARY KEY ,
name VARCHAR ( 255 ) UNIQUE NOT NULL
);
-- Many-to-many relationship
CREATE TABLE movie_actor (
movie_id INTEGER REFERENCES movies(id) ON DELETE CASCADE ,
actor_id INTEGER REFERENCES actors(id) ON DELETE CASCADE ,
PRIMARY KEY (movie_id, actor_id)
);
Location: sql/01_schema.sql
MoviePostgresRepository
The PostgreSQL repository uses stored procedures for data integrity:
class MoviePostgresRepository ( MovieRepository ):
def __init__ ( self , conn ):
self .conn = conn
def save ( self , movie : Movie) -> Movie:
try :
with self .conn.cursor() as cur:
cur.execute( "SELECT * FROM upsert_movie( %s , %s , %s , %s , %s , %s );" , (
movie.imdb_id, movie.title, movie.year, movie.rating,
movie.duration_minutes, movie.metascore
))
movie_data = cur.fetchone()
return Movie(
id = movie_data[ 0 ],
imdb_id = movie_data[ 1 ],
title = movie_data[ 2 ],
year = movie_data[ 3 ],
rating = float (movie_data[ 4 ]),
duration_minutes = movie_data[ 5 ],
metascore = movie_data[ 6 ],
actors = []
)
except DatabaseError as e:
logger.error( f "Error al guardar película ' { movie.title } ': { e } " )
self .conn.rollback()
raise
Location: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:16
Upsert Stored Procedure
The database uses stored procedures to handle INSERT/UPDATE logic:
CREATE OR REPLACE FUNCTION upsert_movie (
p_imdb_id VARCHAR ,
p_title VARCHAR ,
p_year INTEGER ,
p_rating NUMERIC ,
p_duration_minutes INTEGER ,
p_metascore INTEGER
)
RETURNS TABLE (
id INTEGER ,
imdb_id VARCHAR ,
title VARCHAR ,
year INTEGER ,
rating NUMERIC ,
duration_minutes INTEGER ,
metascore INTEGER
)
AS $$
BEGIN
RETURN QUERY
INSERT INTO movies (imdb_id, title, year , rating, duration_minutes, metascore)
VALUES (p_imdb_id, p_title, p_year, p_rating, p_duration_minutes, p_metascore)
ON CONFLICT (imdb_id) DO UPDATE
SET title = EXCLUDED . title ,
year = EXCLUDED . year ,
rating = EXCLUDED . rating ,
duration_minutes = EXCLUDED . duration_minutes ,
metascore = EXCLUDED . metascore
RETURNING movies. * ;
END ;
$$ LANGUAGE plpgsql;
Location: sql/02_procedures.sql
Query Operations
The repository provides query methods:
def find_by_imdb_id ( self , imdb_id : str ) -> Optional[Movie]:
try :
with self .conn.cursor() as cur:
cur.execute( """
SELECT id, imdb_id, title, year, rating, duration_minutes, metascore
FROM movies WHERE imdb_id = %s
""" , (imdb_id,))
movie_data = cur.fetchone()
if movie_data:
return Movie(
id = movie_data[ 0 ],
imdb_id = movie_data[ 1 ],
title = movie_data[ 2 ],
year = movie_data[ 3 ],
rating = float (movie_data[ 4 ]),
duration_minutes = movie_data[ 5 ],
metascore = movie_data[ 6 ],
actors = []
)
return None
except DatabaseError as e:
logger.error( f "Error al buscar película por imdb_id ' { imdb_id } ': { e } " )
self .conn.rollback()
return None
Location: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:43
Connection Configuration
POSTGRES_DB = os.getenv( "POSTGRES_DB" , "imdb_scraper" )
POSTGRES_USER = os.getenv( "POSTGRES_USER" , "aruiz" )
POSTGRES_PASSWORD = os.getenv( "POSTGRES_PASSWORD" , "@ndresruiz@123" )
POSTGRES_HOST = os.getenv( "POSTGRES_HOST" , "postgres" )
POSTGRES_PORT = os.getenv( "POSTGRES_PORT" , "5432" )
POSTGRES_MAX_CONNECTIONS = 10
Location: shared/config/config.py:70
CSV Persistence
MovieCsvRepository
The CSV repository handles file-based persistence with thread safety:
MOVIES_CSV = "data/movies.csv"
MOVIE_HEADERS = [ "id" , "imdb_id" , "title" , "year" , "rating" , "duration_minutes" , "metascore" ]
movie_lock = threading.Lock()
class MovieCsvRepository ( MovieRepository ):
def __init__ ( self ):
os.makedirs(os.path.dirname( MOVIES_CSV ), exist_ok = True )
if not os.path.exists( MOVIES_CSV ):
with open ( MOVIES_CSV , "w" , newline = "" , encoding = "utf-8" ) as f:
writer = csv.writer(f)
writer.writerow( MOVIE_HEADERS )
def _get_next_id ( self ) -> int :
with open ( MOVIES_CSV , "r" , newline = "" , encoding = "utf-8" ) as f:
reader = csv.reader(f)
next (reader) # Skip headers
last_id = 0
for row in reader:
if row:
last_id = int (row[ 0 ])
return last_id + 1
def save ( self , movie : Movie) -> Movie:
with movie_lock:
if movie.id is None :
movie.id = self ._get_next_id()
with open ( MOVIES_CSV , "a" , newline = "" , encoding = "utf-8" ) as f:
writer = csv.writer(f)
writer.writerow([
movie.id,
movie.imdb_id,
movie.title,
movie.year,
movie.rating,
movie.duration_minutes or "" ,
movie.metascore or ""
])
return movie
Location: infrastructure/persistence/csv/repositories/movie_csv_repository.py:12
Thread-Safe Operations
The CSV repository uses threading locks to prevent race conditions:
movie_lock = threading.Lock()
# In save method:
with movie_lock:
if movie.id is None :
movie.id = self ._get_next_id()
# Write operation
Location: infrastructure/persistence/csv/repositories/movie_csv_repository.py:10
CSV Query Operations
def find_by_imdb_id ( self , imdb_id : str ) -> Optional[Movie]:
with movie_lock:
with open ( MOVIES_CSV , "r" , newline = "" , encoding = "utf-8" ) as f:
reader = csv.DictReader(f)
for row in reader:
if row[ "imdb_id" ] == imdb_id:
return Movie(
id = int (row[ "id" ]),
imdb_id = row[ "imdb_id" ],
title = row[ "title" ],
year = int (row[ "year" ]),
rating = float (row[ "rating" ]),
duration_minutes = int (row[ "duration_minutes" ]) if row[ "duration_minutes" ] else None ,
metascore = int (row[ "metascore" ]) if row[ "metascore" ] else None ,
actors = []
)
return None
Location: infrastructure/persistence/csv/repositories/movie_csv_repository.py:56
Repository Pattern Benefits
Interface Abstraction
Both repositories implement the same interface:
class MovieRepository ( ABC ):
@abstractmethod
def save ( self , movie : Movie) -> Movie:
pass
@abstractmethod
def find_by_imdb_id ( self , imdb_id : str ) -> Optional[Movie]:
pass
Location: domain/repositories/movie_repository.py
Dependency Injection
The repository implementation is injected at runtime, allowing easy swapping:
# PostgreSQL repository
movie_repo = MoviePostgresRepository(conn)
# CSV repository
movie_repo = MovieCsvRepository()
# Both implement the same interface
movie = movie_repo.save(movie)
Use Case Orchestration
Composite Use Case
The composite pattern allows executing multiple persistence strategies:
# Create individual use cases
postgres_use_case = SaveMovieWithActorsPostgresUseCase(
movie_repo = postgres_movie_repo,
actor_repo = postgres_actor_repo,
movie_actor_repo = postgres_movie_actor_repo
)
csv_use_case = SaveMovieWithActorsCsvUseCase(
movie_repo = csv_movie_repo,
actor_repo = csv_actor_repo,
movie_actor_repo = csv_movie_actor_repo
)
# Combine them
composite_use_case = CompositeSaveMovieWithActorsUseCase(
use_cases = [postgres_use_case, csv_use_case]
)
# Execute both in parallel
composite_use_case.execute(movie)
Location: application/use_cases/composite_save_movie_with_actors_use_case.py
Data Flow
movies.csv
id, imdb_id, title, year, rating, duration_minutes, metascore
1, tt0111161, The Shawshank Redemption, 1994, 9.3, 142, 82
2, tt0068646, The Godfather, 1972, 9.2, 175, 100
actors.csv
id, name
1, Tim Robbins
2, Morgan Freeman
3, Marlon Brando
movie_actor.csv
movie_id, actor_id
1, 1
1, 2
2, 3
Location: data/
Configuration
PostgreSQL Settings
POSTGRES_DB = imdb_scraper
POSTGRES_USER = aruiz
POSTGRES_PASSWORD = your_password
POSTGRES_PORT = 5432
POSTGRES_HOST = postgres
CSV Settings
CSV files are automatically created in the data/ directory with proper headers and UTF-8 encoding.
Error Handling
PostgreSQL Error Recovery
try :
cur.execute( "SELECT * FROM upsert_movie( %s , %s , %s , %s , %s , %s );" , ( ... ))
movie_data = cur.fetchone()
return Movie( ... )
except DatabaseError as e:
logger.error( f "Error al guardar película ' { movie.title } ': { e } " )
self .conn.rollback()
raise
CSV Error Recovery
CSV operations are wrapped in locks and automatically create missing directories:
os.makedirs(os.path.dirname( MOVIES_CSV ), exist_ok = True )
Next Steps
Scraping Engine Learn how data is extracted
Concurrency Explore parallel data processing